package com.weixin.gong.example.kafka;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * @author weixin.gong
 * @date 15-7-3 下午5:30
 */
public class KafKaSingleConsumer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("auto.offset.reset", "smallest"); //必须要加，如果要读旧数据
        props.put("zookeeper.connect", "zk.dev.corp.qunar.com:2181/fuwu_kafka");
        props.put("group.id", "wx");
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");//多长时间自动提交一次commit
        props.put("consumer.timeout.ms", "1000");//超过多长时间没有获取到新的消息就产生超时异常
        props.put("auto.commit.enable", "false");//是否自动提交commit，提交commit意味着当前消费者消费到了这个位置

        ConsumerConfig conf = new ConsumerConfig(props);
        ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(conf);
        String topic = "weixin";
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        //topic和对应处理的线程数
        topicCountMap.put(topic, 1);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        //对应topic的streams,一个线程一个stream
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

        KafkaStream<byte[], byte[]> stream = streams.get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        int count = 0;
        while (true) {
            try {
                while (it.hasNext()){
                    MessageAndMetadata<byte[],byte[]> message = it.next();
//                    System.out.println("key:" + new String(message.key()));
                    System.out.println("message:" + new String(message.message()));
                    System.out.println("count:" + count);
                    if (count == 0) {
                        consumer.commitOffsets();//手动提交commit，位置为当前it迭代到的位置
                    }
                    count++;
                }
            } catch (ConsumerTimeoutException e) {
                e.printStackTrace();
            }
        }
//        if (consumer != null) consumer.shutdown();   //其实执行不到，因为上面的hasNext会block
    }
}
